﻿using System;
using Core.Framework.Loger;
using Core.Framework.Model.Common;
using Core.Framework.Model.WebSockets;
using Core.Framework.Redis.Queue_Helper;
using Core.Framework.Util;
using Core.IBusiness.ISocketModel;
using Core.Service.TaskHandle;
using Newtonsoft.Json;
using System.Threading;
using System.Threading.Tasks;

namespace Core.Middleware.WebSockets
{
    public class Consumer
    {
        /// <summary>
        /// 等待时间
        /// </summary>
        private int sleep { get; set; } = 1000 / 10;

        /// <summary>
        /// 消息备份标记
        /// </summary>
        private int msgBackupFlag { get; set; } = 0;



        /// <summary>
        /// 并发处理
        /// </summary>
        /// <param name="threadLength">线程数量</param>
        /// <param name="sleep">执行完成后等待时间/ms [若消息为空 将休眠此基数的6倍]</param>
        public void TimedLoop(int threadLength, int sleep)
        {

            this.sleep = sleep;

            while (true)
            {
                try
                {
                    var values = RedisQueueHelper.SortedPop(RedisQueueHelperParameter.Queue);

                    if (values.Length > 0)
                    {
                        Parallel.ForEach(values, new ParallelOptions { MaxDegreeOfParallelism = threadLength }, item =>
                        {
                            MessageQueue value = ((string)item).TryToEntity<MessageQueue>(out string errmsg);
                            this.ExecTask(value);
                            RunningLoger.Queue(item);
                        });

                        msgBackupFlag++;

                        if (msgBackupFlag > 30)
                        {
                            msgBackupFlag = 0;
                            this.TimerJobMethodMsgBackups();
                        }

                        Thread.Sleep(this.sleep);
                    }
                    else
                    {
                        Thread.Sleep(sleep * 5);
                    }
                }
                catch (Exception e)
                {
                    RunningLoger.Error($"TimedLoop:{e.Message}");
                    Thread.Sleep(sleep * 1);
                }

                
            }
        }

        /// <summary>
        /// 获取任务执行备份
        /// </summary>
        void TimerJobMethodMsgBackups()
        {
            ISocketMessage iQueueMessage =
                BuildServiceProvider.GetService<ISocketMessage>();

            iQueueMessage.SaveSingleOffLine(RedisQueueHelper.GetListPopByLength(RedisQueueHelperParameter.SingleNull, 0, 100));

            iQueueMessage.SaveHistory(RedisQueueHelper.GetListPopByLength(RedisQueueHelperParameter.History, 0, 100 * 3));
        }

        /// <summary>
        /// 任务处理
        /// </summary>
        /// <param name="message"></param>
        /// <returns></returns>
        bool ExecTask(MessageQueue message)
        {
            bool result = false;

            if (null != message)
            {
                TaskHandleFactory.DoDispatch(message,a => a.Template == "*" || a.Template.ToLower() == message.Template.ToLower(),
                    (template, error) =>
                    {
                        RunningLoger.Error($"DoDispatch:为匹配到相关控制器:template:{template},error:{error},message:{message.TryToJson()}");

                    });
                result = true;
            }

            return result;
        }

        enum TimerJobMethodEnum
        {
            Null = 0,
            Success = 200,
            Error = 400
        }
    }
}
